package com.ruyuan.little.project.rocketmq.api.order.service.impl;

import com.alibaba.fastjson.JSON;
import com.ruyuan.little.project.common.enums.MessageTypeEnum;
import com.ruyuan.little.project.rocketmq.api.order.dto.OrderInfoDTO;
import com.ruyuan.little.project.rocketmq.api.order.dto.OrderMessageDTO;
import com.ruyuan.little.project.rocketmq.api.order.service.OrderEventInformManager;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;

@Service
public class OrderEventInformManagerImpl implements OrderEventInformManager {

    /**
     * 日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventInformManagerImpl.class);

    @Resource(name = "orderMqProducer")
    private DefaultMQProducer orderMqProducer;

    @Value("${rocketmq.order.topic}")
    private String orderTopic;

    /**
     * 订单延时消息topic
     */
    @Value("${rocketmq.order.delay.topic}")
    private String orderDelayTopic;

    /**
     * 订单延时消息等级
     */
    @Value("${rocketmq.order.delay.level}")
    private Integer orderDelayLevel;

    /**
     * 处理创建订单消息
     * -
     * @param orderInfoDTO 订单信息
     */
    @Override
    public void informCreateOrderEvent(OrderInfoDTO orderInfoDTO) {
        this.sendOrderMessage(MessageTypeEnum.WX_CREATE_ORDER,orderInfoDTO);

        // 订单超时延时消息
        this.sendOrderDelayMessage(orderInfoDTO);
    }

    /**
     * 发送取消订单的消息
     * @param orderInfoDTO
     */
    @Override
    public void informCancelOrderEvent(OrderInfoDTO orderInfoDTO) {
        this.sendOrderMessage(MessageTypeEnum.WX_CANCEL_ORDER,orderInfoDTO);
    }

    @Override
    public void informPayOrderEvent(OrderInfoDTO orderInfo) {
        this.sendOrderMessage(MessageTypeEnum.WX_PAY_ORDER,orderInfo);
    }

    /**
     * 通知订单入住
     * @param orderInfo
     */
    @Override
    public void informConfirmOrderEvent(OrderInfoDTO orderInfo) {
        this.sendOrderMessage(MessageTypeEnum.WX_CONFIRM_ORDER,orderInfo);
    }

    /**
     * 退房事件
     * @param orderInfo
     */
    @Override
    public void informOrderFinishEvent(OrderInfoDTO orderInfo) {
        this.sendOrderMessage(MessageTypeEnum.WX_FINISHED_ORDER,orderInfo);
    }

    /**
     * 延迟消息：创建订单消息，用作未支付30分钟，自动取消
     * @param orderInfoDTO
     */
    private void sendOrderDelayMessage(OrderInfoDTO orderInfoDTO) {
        Message message = new Message();
        message.setTopic(orderDelayTopic);
        message.setDelayTimeLevel(orderDelayLevel);
        message.setBody(JSON.toJSONString(orderInfoDTO).getBytes(StandardCharsets.UTF_8));

        try {
            SendResult sendResult = orderMqProducer.send(message);
            LOGGER.info("send order delay pay message finished orderNo:{} delayTimeLevel:{}", orderInfoDTO.getOrderNo(), orderDelayLevel);
        } catch (Exception e) {
            LOGGER.error("send order delay message fail,error message:{}", e.getMessage());
        }
    }

    /**
     * 发送订单消息
     * 消息顺序处理
     *
     * @param messageTypeEnum
     * @param orderInfoDTO
     */
    private void sendOrderMessage(MessageTypeEnum messageTypeEnum, OrderInfoDTO orderInfoDTO) {

        OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
        orderMessageDTO.setMessageTypeEnum(messageTypeEnum);
        orderMessageDTO.setContent(JSON.toJSONString(orderInfoDTO));

        Message message = new Message();
        message.setTopic(orderTopic);
        message.setBody(JSON.toJSONString(orderMessageDTO).getBytes(StandardCharsets.UTF_8));

        try {
            SendResult sendResult = orderMqProducer.send(message, (mqs, message1, orderId) -> {
                Integer _orderId = (Integer) orderId;
                int index = _orderId % mqs.size();
                return mqs.get(index);
            }, orderInfoDTO.getId());
            LOGGER.info("send order message finished messageTypeEnum:{}, orderNo:{}，sendResult:{}",
                    messageTypeEnum, orderInfoDTO.getOrderNo(),JSON.toJSONString(sendResult));
        } catch (Exception e) {
            LOGGER.error("send order message fail,error message:{}", e.getMessage());
        }

    }
}
